package com.atguigu.flink.chapter12;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/12/22 14:31
 */
public class Flink01_TopN {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        // 1. 建立一个动态表与文件关联
        tEnv.executeSql("create table ub(" +
                            "   userId bigint, " +
                            "   itemId bigint, " +
                            "   categoryId int, " +
                            "   behavior string, " +
                            "   ts bigint, " +
                            "   et as TO_TIMESTAMP_LTZ(ts * 1000, 3), " +
                            "   watermark for et as et - interval '3' second " +
                            ")with(" +
                            "   'connector'='filesystem', " +
                            "   'path'='input/UserBehavior.csv', " +
                            "   'format'='csv' " +
                            ")");
        
        // 2. 按照商品id, 窗口 分组聚合
        Table t1 = tEnv.sqlQuery("select " +
                                     " itemId, " +
                                     " hop_start(et, interval '30' minute ,interval '1' hour) stt, " +
                                     " hop_end(et, interval '30' minute ,interval '1' hour) edt," +
                                     " count(*) ct " +  // sum(1) count(*) count(id)
                                     "from ub " +
                                     "where behavior='pv' " +
                                     "group by itemId, hop(et, interval '30' minute ,interval '1' hour)");
        tEnv.createTemporaryView("t1", t1);
        // 3. 使用over窗口, 给点击量计算名次
        Table t2 = tEnv.sqlQuery("select" +
                                     " itemId, stt, edt, ct, " +
                                     " row_number() over(partition by edt order by ct desc) rn " +  // rank dense_rank row_number
                                     "from t1");
        tEnv.createTemporaryView("t2", t2);
        
        // 4. 过滤出来top3
        Table result = tEnv.sqlQuery("select " +
                                         "  edt w_end," +
                                         "  itemId item_id, " +
                                         "  ct item_count, " +
                                         "  rn rk " +
                                         " from t2 where rn <= 3");
        
        // 5. 把数据写入到mysql中
        tEnv.executeSql("CREATE TABLE `hot_item` (" +
                            "  `w_end` timestamp ," +
                            "  `item_id` bigint," +
                            "  `item_count` bigint," +
                            "  `rk` bigint," +
                            "  PRIMARY KEY (`w_end`,`rk`)  NOT ENFORCED" +
                            ")with(" +
                            "   'connector'='jdbc', " +
                            "   'url' = 'jdbc:mysql://hadoop162:3306/flink_sql'," +
                            "   'table-name' = 'hot_item', " +
                            "   'username' = 'root', " +
                            "   'password' = 'aaaaaa' " +
                            ")");
        
        result.executeInsert("hot_item");
        
    }
}
